[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3670
[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3670xumingming wants to merge 2 commits into
Conversation
cf055e4 to
a4ee01e
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3670 +/- ##
==========================================
+ Coverage 66.91% 67.05% +0.15%
==========================================
Files 358 359 +1
Lines 21986 22197 +211
Branches 1946 1970 +24
==========================================
+ Hits 14710 14883 +173
- Misses 6262 6292 +30
- Partials 1014 1022 +8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Extends Celeborn’s E2E shuffle integrity “checked zone” upstream by computing per-batch CRC immediately after batch assembly (writer thread) rather than inside the async DataPusher pipeline, aiming to detect corruption occurring between batch assembly and async dispatch.
Changes:
- Adds
ShuffleClient.computeBatchCRC()and implements it inShuffleClientImpl(no-op inDummyShuffleClient). - Moves CRC accumulation call sites into Spark shuffle writers and
SortBasedPusherright before enqueue/push/merge. - Adds a unit test validating CRC accumulation behavior.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java | Adds UT for computeBatchCRC() accumulation behavior. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | Removes CRC accumulation from pushOrMergeData() and adds computeBatchCRC() implementation. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClient.java | Introduces the new computeBatchCRC() API with Javadoc. |
| client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java | Implements new abstract method as a no-op. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | Calls computeBatchCRC() for giant-record pushes. |
| client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | Calls computeBatchCRC() before enqueue and per-partition final flush. |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java | Calls computeBatchCRC() for giant-record pushes. |
| client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | Calls computeBatchCRC() before enqueue and per-partition final flush. |
| client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java | Calls computeBatchCRC() before partition-change flush, overflow flush, and final flush. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Thanks for the fix! I have been thinking about this change as well. I am also interested in learning what other folks in the community think as well about the current explicit call approach from all writer call sites. |
|
One option might to have 2 checksums (from both writer and shuffle client) on the client as well, and then compare them to be the same before sending the metadata. If there is a mismatch, then fail the task -> would catch both cases - that either a call site has been missed or that DataPusher did not push all data |
|
Thanks for this PR — the analysis of the coverage gap is spot-on. I have a suggestion on the API design that could make it more maintainable. Concern: Implicit CRC contractThe current approach requires every call site to remember Suggestion: Encapsulate CRC + push as atomic operationsIntroduce // ShuffleClient.java
public int pushDataWithCRC(int shuffleId, int mapId, int attemptId,
int partitionId, byte[] data, int offset, int length,
int numMappers, int numPartitions) throws IOException {
computeBatchCRC(shuffleId, mapId, attemptId, partitionId, data, offset, length);
return pushData(shuffleId, mapId, attemptId, partitionId,
data, offset, length, numMappers, numPartitions);
}
public int mergeDataWithCRC(int shuffleId, int mapId, int attemptId,
int partitionId, byte[] data, int offset, int length,
int numMappers, int numPartitions) throws IOException {
computeBatchCRC(shuffleId, mapId, attemptId, partitionId, data, offset, length);
return mergeData(shuffleId, mapId, attemptId, partitionId,
data, offset, length, numMappers, numPartitions);
}
// For the DataPusher path
public void addTaskWithCRC(int shuffleId, int mapId, int attemptId,
int partitionId, byte[] buffer, int size) throws InterruptedException {
computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
dataPusher.addTask(partitionId, buffer, size);
}Then the 7 writer call sites each become a single call — no way to forget CRC: // Before (two operations, easy to miss CRC)
shuffleClient.computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
dataPusher.addTask(partitionId, buffer, size);
// After (one atomic operation)
shuffleClient.addTaskWithCRC(shuffleId, mapId, attemptId, partitionId, buffer, size);The bare This doesn't eliminate the 7 call sites, but it makes them self-contained and impossible to get wrong. The invariant shifts from "callers must remember to compute CRC first" (implicit, fragile) to "use the CRC-inclusive API" (explicit, hard to misuse). Other than that, a couple of minor notes on the test:
Review generated with the assistance of Claude AI |
@gauravkm interesting idea! Failing the task at the mapper side is far cheaper than discovering corruption at reducers and rerunning the entire job. We can explore this as a follow-up improvement. |
|
@RexXiong Thanks for the review. After some thinking, I find the suggestion you provided to be not feasible. The reason I want to separate CRC calculation from pushOrMergeData is because current CRC calculation is too late, so I separate it from pushOrMergeData to call it at a earlier call site. The call site could be:
For adding pushDataWithCRC to replace pushData, it is doable technically, but it is actually similar to original pushOrMergeData, which is where we were from. Even if we have added all the xxxWithCRC methods. The methods in ShuffleClient will be:
It seems more complicated than my current solution. How do you think? |
@gauravkm Interesting idea! Looking forward to detailed proposal. |
Celeborn's E2E integrity check computes CRC_M inside `ShuffleClientImpl.pushOrMergeData()`, which runs in the async `DataPusher` thread. This leaves the segment from batch assembly in the writer thread through the `DataPusher` queue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently. This change closes that gap and enables detection of a class of correctness bugs where data corruption occurs between batch assembly and async push dispatch, including bugs involving shared buffer pool references. Introduce `ShuffleClient.computeBatchCRC()` and call it immediately before each assembled batch enters the async push pipeline, at 7 call sites across 3 classes: `HashBasedShuffleWriter` (spark-2/3) at `flushSendBuffer()`, `pushGiantRecord()`, and the per-partition flush in `close()`; and `SortBasedPusher` at the partition-change flush, buffer-overflow flush, `pushGiantRecord()`, and final flush. The now-redundant CRC computation inside `pushOrMergeData()` is removed. This approach is less elegant than the original design, which had a single CRC call site inside `pushOrMergeData()` — one place to reason about and maintain. The new design scatters `computeBatchCRC()` across 7 call sites, but the trade-off is justified: the checked zone now starts at batch assembly rather than at async push dispatch, covering more of the data pipeline and enabling detection of a broader class of correctness bugs.
a4ee01e to
f3dd0f0
Compare
|
@RexXiong @SteNicholas Have made corresponding changes, please take a look again. |
There isn't a lot more to it. Essentially we keep both the existing checksum computation, and store the CRC computation being added from the writers (in this PR) separately. And then before we send metadata, we ensure that both the computations match. Otherwise we fail the task Essentially there are two layers (writer and shuffle client) computing their own checksums, and then shuffle client compares them before propagation at mapper end |
|
@RexXiong @SteNicholas Gentle ping :) |
|
@SteNicholas Can you take another look? |
|
@xumingming, please take a look at the comments of claude code: |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
What changes were proposed in this pull request?
Celeborn's E2E integrity check computes CRC_M inside
ShuffleClientImpl.pushOrMergeData(), which runs in the asyncDataPusherthread. This leaves the segment from batch assembly in the writer thread through theDataPusherqueue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.This change closes that gap and enables detection of a class of correctness bugs where data corruption occurs between batch assembly and async push dispatch, including bugs involving shared buffer pool references.
Introduce
ShuffleClient.computeBatchCRC()and call it immediately before each assembled batch enters the async push pipeline, at 7 call sites across 3 classes:HashBasedShuffleWriter(spark-2/3) atflushSendBuffer(),pushGiantRecord(), and the per-partition flush inclose(); andSortBasedPusherat the partition-change flush, buffer-overflow flush,pushGiantRecord(), and final flush. The now-redundant CRC computation insidepushOrMergeData()is removed.This approach is less elegant than the original design, which had a single CRC call site inside
pushOrMergeData()— one place to reason about and maintain. The new design scatterscomputeBatchCRC()across 7 call sites, but the trade-off is justified: the checked zone now starts at batch assembly rather than at async push dispatch, covering more of the data pipeline and enabling detection of a broader class of correctness bugs.Why are the changes needed?
Celeborn's E2E integrity check computes CRC_M inside
ShuffleClientImpl.pushOrMergeData(), which runs in the asyncDataPusherthread. This leaves the segment from batch assembly in the writer thread through theDataPusherqueue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.Does this PR resolve a correctness bug?
It could help us detect more correctness bug.
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT